package com.shujia.spark.core

import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}

object Demo13Patition {

  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("partition")
      .set("spark.default.parallelism", "23") //shuffle 之后默认并行度

    val sc = new SparkContext(conf)

    /**
      * 分区生产规则
      * 1、默认一个block对应一个分区, 一个task处理128M的数据
      * 2、可以设置最小分区数，实际分区数会根据文件数量进行计算，保证文件能被分开
      * 3、吐过block的数量比最小分区数大，一block数量为准
      */

    val linesRDD: RDD[String] = sc.textFile("data/words", 2)

    println("linesRDD分区数据：" + linesRDD.getNumPartitions)

    //没有shuffle算子生成的rdd分区数等于上一个rdd的分区数
    val wordsRDD: RDD[String] = linesRDD.flatMap(_.split(","))

    println("wordsRDD分区数：" + wordsRDD.getNumPartitions)

    /**
      * shuufle 之后rdd分区数
      * 1、如果不指定默认等于前一个rdd分区数
      * 2、可以手动执行分区数 （numPartitions）
      * 3、设置默认并行度spark.default.parallelism
      *
      * 优先级
      * 手动指定---> spark.default.parallelism ---> 前一个rdd分区数
      *
      *
      * 分区数越多--> task 越多---> 计算并行度越高---> 任务越快( 导致产生很多小文件，浪费计算资源)
      */

    val groupByRDD: RDD[(String, Iterable[String])] = wordsRDD.groupBy((w: String) => w, 100)

    println("groupBy分区数：" + groupByRDD.getNumPartitions)

    //groupByRDD.foreach(println)


    val myParttionRDD: RDD[(String, Iterable[String])] = wordsRDD.groupBy((w: String) => w, new MyPartition)


    println("myParttionRDD分区数:" + myParttionRDD.getNumPartitions)


    /**
      * repartition: 没有实际的业务逻辑，只是修改rdd分区数据，但是会产生shuffle
      * repartition : 既可以提高分区也可以减少分区
      *
      * coalesce: 修改分区数据，如果不产生shuufle ,不能用于提高分区数据
      *
      * coalesce（shuffle=false）： 一般用于合并小文件，不产生shuffle ,效率高
      *
      */

    val rePartitionRDD: RDD[(String, Iterable[String])] = myParttionRDD.repartition(1000)


    println("rePartitionRDD分区数据：" + rePartitionRDD.getNumPartitions)


    val coalesceRDD: RDD[(String, Iterable[String])] = rePartitionRDD.coalesce(10, false)

    println("coalesceRDD分区数：" + coalesceRDD.getNumPartitions)

    /*
        while (true) {
        }*/

  }

}


/**
  * 自定义分区，默认是hash分区
  *
  */
class MyPartition extends Partitioner {

  //指定rdd分区数
  override def numPartitions: Int = 100

  /**
    * spark 在shuffle的时候会调用这个方法来获取分区数
    *
    */
  override def getPartition(key: Any): Int = {
    ///hash 分区
    math.abs(key.hashCode()) % numPartitions
  }
}

